あのMaterializeがクラウドに!「Materialize Cloud」を使ってみた
大阪オフィスの玉井です。
以前、Materializeというツールを紹介しました(私が書いた記事の中では比較的多く読まれているようです)。
今回、そのMaterializeをクラウド越し利用できるMaterialize Cloudがオープンβ版として登場しました。無料でトライアル出来るとのことで、早速使ってみました。
やってみた
下記のチュートリアルを参考にします。
検証環境
- macOS Big Sur11.5.2
- psql 13.4
Materialize Cloudの準備
上記ウェブサイトからRegisterします。無料です。
アカウントを設定して、メールで送られてきたURL先でログインすると、あっという間にMaterialize Cloudに入ることができます。
ちなみに本当は、最初に、Materialize社の方が、オンボーディングのためのMTGをセッティングしてくれるのですが(というか、そのMTGを経ないと利用できない)、「英語が話せないから、先にセルフで環境触らせてくれへん?」って英語でサポートにメールしたら、快諾してくれました。ありがとうございますMaterializeのみなさん。
Materialize Cloudは、Materializeのインスタンスを起動→そのインスタンスに接続して使用、という流れで使います。ということで、まずはインスタンス作成画面に移動します。
無料のオープンβ版なので、インスタンスサイズはXSのみ、インスタンスが稼働するパブリッククラウドはAWSのみとなっております。ちなみに、インスタンスの名前は自動で付与してくれます(もちろん好きな名前にすることができますが、面倒だったので、このままでいきます)。
インスタンス作成ボタンをポチッと押すと、あっという間にインスタンスが立ち上がります。インスタンスの詳細画面を確認すると、インスタンスに対する接続情報を確認することができます。
Materialize Cloudに接続する
今回はpsqlで接続します(ネイティブで対応しています)。Materialize Cloudの画面に、コマンドまでご丁寧に用意してくれているので、それをそのまま使用して、Materialize Cloudインスタンスに接続します。ちなみに、SSL接続になるので、先に証明書ファイルをダウンロードしておく必要があります。忘れないようにしましょう。
> psql "postgresql://[email protected]:6875/materialize?sslmode=require&sslcert=materialize.crt&sslkey=materialize.key&sslrootcert=ca.crt" psql (13.4, server 9.5.0) SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) Type "help" for help. materialize=>
ストリーミングデータに対してSQLを実行してみる
では、ここからは、Materializeの基本にして真骨頂である、ストリーミングデータをDBみたいにSQLで問い合わせる、という部分を、実際に試してみたいと思います。
データソースの用意
ストリーミングデータって実際に用意するのがクソ面倒難しいのですが、公式ドキュメントでは、PubNubというサービスのMarket Orders Data Streamを利用する形になっていました。アカウントを作る必要もなく、ウェブサイトにある接続情報を利用するだけで、一瞬でストリーミングデータを利用できるので非常に便利です。
PubNubのマーケットストリーミングデータの情報を確認したら、それをもとにCREATE SOURCE
文を実行します。このSOURCEという概念が、Materializeにおけるデータソースになりますので、Materializeは基本的にはこのクエリから始まると思います。
CREATE SOURCE market_orders_raw FROM PUBNUB SUBSCRIBE KEY 'sub-c-5e072e06-1a7c-11ec-9ca7-5693d1c31269' CHANNEL 'pubnub-market-orders'
SHOWクエリでSOURCEの情報を確認することができます。
materialize=> SHOW COLUMNS FROM market_orders_raw; name | nullable | type ------+----------+------ text | f | text (1 row)
当然ながら、Apache KafkaやAmazon Kinesisといった、主要なストリーミングデータ基盤にも対応しています(CREATE SOURCEできる)。詳しくはドキュメントをどうぞ。
ストリーミングデータを対象にしたマテビューを作ってみる
SOURCEを作り、PubNubから流れ続けるデータをMaterializeで補足する準備が整いました。これを基にして、マテビューを作っていきます。
まずは、普通のVIEWを作ります。これはまだマテビューではないので、実態はただのクエリです(ストリーミングデータのjsonb型を行列形式に変換しているだけ)。
CREATE VIEW market_orders AS SELECT ((text::jsonb)->>'bid_price')::float AS bid_price, (text::jsonb)->>'order_quantity' AS order_quantity, (text::jsonb)->>'symbol' AS symbol, (text::jsonb)->>'trade_type' AS trade_type, to_timestamp(((text::jsonb)->'timestamp')::bigint) AS ts FROM market_orders_raw
上記のVIEWを基に、マテビューを作成します。
CREATE MATERIALIZED VIEW avg_bid AS SELECT symbol, AVG(bid_price) AS AVG FROM market_orders GROUP BY symbol
マテビューを参照してみます。
materialize=> SELECT * FROM avg_bid; symbol | avg -------------+-------------------- Apple | 191.27232718467712 Google | 308.4405372142792 Elerium | 178.60857986211778 Bespin Gas | 172.4710697134336 Linen Cloth | 225.99615881840387 (5 rows)
みなさん御存知の通り、マテビューというのは、クエリだけでなく、実際の結果も保存しておく部分が、普通のVIEWとの違いです。ただ、このMaterializeのマテビューのすごいところは、流れ続けるストリーミングデータを常に追い続けるようになってるところです。結果を常に更新し続けているマテビュー、というイメージです。
ですので、下記のように実行するたびにマテビューの結果も変わります。常に最新の結果が取得できます。まさにリアルタイム。
ちなみに、TAIL
という関数を使用すると、そのマテビューの(ストリーミングによる)変更状態を監視することができます。
COPY(TAIL avg_bid) TO stdout
結合を試してみる
Materializeは一般的なSQLが使えますが、もちろんJOINも使用できます。
まずは結合を試すためのテーブルを作ります。見て分かる通り、これはただの固定値が入ってるだけです。
CREATE TABLE symbols( symbol text, ticker text )
INSERT INTO symbols SELECT * FROM (VALUES('Apple', 'AAPL'),('Google', 'GOOG'),('Elerium', 'ELER'),('Bespin Gas', 'BGAS'),('Linen Cloth', 'LCLO'))
そして、マテビューを作ります。ここで先程作ったテーブルを、symbol
というキーで結合しています。
CREATE MATERIALIZED VIEW cnt_ticker AS SELECT s.ticker AS ticker, COUNT(*) AS cnt FROM market_orders m JOIN symbols s ON m.symbol = s.symbol GROUP BY s.ticker;
結合していても、Materializeのマテビューは、常に最新の結果を返してくれます。
マテビューにフィルタリングを仕込む
一般的なSQLが使用できると言っている以上、WHERE句も、もちろん使用できます。
下記のマテビューでは、データ全体の件数ではなく、(現在から数えて)過去1分間の件数を出すようにしています。mz_logical_timestamp()
というのは、Materializeでいう現在時間を出す関数です(now()
と同じ)。
CREATE MATERIALIZED VIEW cnt_sliding AS SELECT symbol, COUNT(*) AS cnt FROM market_orders m WHERE EXTRACT(EPOCH FROM (ts + INTERVAL '1 minute'))::bigint * 1000 > mz_logical_timestamp() GROUP BY symbol
もうGIF動画はありませんが、こちらも以前と同様、実行するたび、最新の過去1分間の結果を返してくれます。
materialize=> SELECT * FROM cnt_sliding; symbol | cnt -------------+----- Apple | 46 Google | 46 Elerium | 40 Bespin Gas | 55 Linen Cloth | 47 (5 rows)
おわりに
意味不明なスピード(褒め言葉)でストリーミングデータの最新結果を取得できるMaterializeですが、クラウド上で簡単に準備することができるようになったことで、より気軽に利用できるようになりました。インフラの運用も考えなくていいので、リアルタイムなデータ分析を求めている方は、まずはテスト的に使用してみてはどうでしょうか。